Solutions/Auth0/Data Connectors/Auth0Connector/main.py (321 lines of code) (raw):

import logging import requests import os import re import json import time from typing import Tuple, List, Union from datetime import datetime as dt from datetime import timedelta import azure.functions as func from Auth0Connector.sentinel_connector import AzureSentinelConnector from Auth0Connector.state_manager import StateManager WORKSPACE_ID = os.environ['WorkspaceID'] SHARED_KEY = os.environ['WorkspaceKey'] FILE_SHARE_CONNECTION_STRING = os.environ['AzureWebJobsStorage'] LOG_TYPE = 'Auth0AM' MAX_SCRIPT_EXEC_TIME_MINUTES = 5 FIELD_SIZE_LIMIT_BYTES = 1000 * 32 logging.getLogger( 'azure.core.pipeline.policies.http_logging_policy').setLevel(logging.ERROR) LOG_ANALYTICS_URI = os.environ.get('logAnalyticsUri') if not LOG_ANALYTICS_URI or str(LOG_ANALYTICS_URI).isspace(): LOG_ANALYTICS_URI = 'https://' + WORKSPACE_ID + '.ods.opinsights.azure.com' pattern = r'https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$' match = re.match(pattern, str(LOG_ANALYTICS_URI)) if not match: raise Exception("Invalid Log Analytics Uri.") DOMAIN = os.environ['DOMAIN'] API_PATH = '/api/v2/logs' CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] AUDIENCE = DOMAIN + '/api/v2/' retry = 3 error = False def main(mytimer: func.TimerRequest): logging.info('Script started.') script_start_time = int(time.time()) state_manager = StateManager( FILE_SHARE_CONNECTION_STRING, file_path='auth0_confing.json') config_string = state_manager.get() if config_string: config = json.loads(config_string) else: config = json.loads('{"last_log_id": "","last_date": ""}') logging.info(f'Config loaded\n\t{config}') connector = Auth0Connector( DOMAIN, API_PATH, CLIENT_ID, CLIENT_SECRET, AUDIENCE) connector.get_log_events(script_start_time, config) logging.info(f'Finish script.') class Auth0Connector: def __init__(self, domain, api_path, client_id, client_secret, audience): self.state_manager = StateManager( FILE_SHARE_CONNECTION_STRING, file_path='auth0_confing.json') self.sentinel = AzureSentinelConnector( LOG_ANALYTICS_URI, WORKSPACE_ID, SHARED_KEY, LOG_TYPE, queue_size=1000) self.domain = domain self.api_path = api_path self.client_id = client_id self.client_secret = client_secret self.audience = audience self.uri = self.domain + self.api_path self.token = None self.header = None self.retry = retry """This method is used to process and post the results to Log Analytics Workspace Returns: last_log_id : last processed eventId events: last processed Events """ def get_log_events(self, script_start_time, config: dict) -> Tuple[str, List]: self.token = self._get_token() logging.info(f'Token provided.') self.header = self._get_header() last_log_id = self._get_last_log_id(config) # last_log_id = "90020230126121002244690048186607762971591195832157732866" logging.info(f'\tLast log id extracted: {last_log_id}.') if last_log_id is None: # return '', [] self.update_statemarker_file(config, '', []) return # first request params = {'from': last_log_id, 'take': '100'} count = 0 error = True while error: try: error = False resp = requests.get( self.uri, headers=self.header, params=params) if not resp.json(): self.update_statemarker_file(config, last_log_id, []) return events = resp.json() if 'statusCode' in events: raise Exception(events['error']) except Exception as err: error = True count += 1 if (err == 'Too Many Requests'): time.sleep(1) logging.error( "Something wrong. Exception error text: {}".format(err)) if count > self.retry: logging.error("Exceeded maximum Retries") break logging.info('\tFirst request executed.') if not resp.json(): # return last_log_id, [] self.update_statemarker_file(config, last_log_id, []) return events = resp.json() logging.info(f'Response Object : {events}') events.sort(key=lambda item: item['date'], reverse=True) last_log_id = events[0]['log_id'] for el in events: self.customize_event(el) self.sentinel.send(el) self.sentinel.flush() logging.info('Events sent to Sentinel.') self.update_statemarker_file(config, last_log_id, events) if "Link" in resp.headers: next_link = resp.headers['Link'] next_uri = next_link[next_link.index('<') + 1:next_link.index('>')] page_num = 1 while resp.json() and len(events) != 0: count = 0 error = True while error: try: error = False resp = requests.get(next_uri, headers=self.header) events = resp.json() if 'statusCode' in events: raise Exception(events['error']) except Exception as err: error = True count += 1 if (err == 'Too Many Requests'): time.sleep(1) logging.error( "Something wrong. Exception error text: {}".format(err)) if count > self.retry: logging.error("Exceeded maximum Retries") break # logging.info(f'\t Response message {resp.headers}') try: next_link = resp.headers['Link'] next_uri = next_link[next_link.index( '<') + 1:next_link.index('>')] events = resp.json() logging.info(f'\t#{page_num} extracted') page_num += 1 if page_num % 9 == 0: time.sleep(1) if len(events) != 0: events.sort( key=lambda item: item['date'], reverse=True) last_log_id = events[0]['log_id'] for el in events: self.customize_event(el) self.sentinel.send(el) self.sentinel.flush() self.update_statemarker_file( config, last_log_id, events) if self.check_if_script_runs_too_long(script_start_time): logging.info( f'Script is running too long. Stop processing new events. Finish script.') break except Exception as err: logging.error( "Something wrong. Exception error text: {}".format(err)) break # logging.info(f'\t New last log id: {last_log_id}\n at date {events[0]["date"]}. Events extracted.') return last_log_id, events def _get_last_log_id(self, config: dict) -> Union[str, None]: if config['last_log_id'] == '': start_time = str(dt.now() - timedelta(hours=1)) # start_time = '2022-04-06T14:45:15.861Z' params = {'q': f'date:[{start_time} TO {str(dt.now())}]', 'sort': 'date:1'} resp = requests.get(self.uri, headers=self.header, params=params) if not resp.json(): return None last_log_id = resp.json()[0]['log_id'] else: last_log_id = config['last_log_id'] return last_log_id def _get_token(self): params = { 'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': self.client_secret, 'audience': self.audience } header = {'content-type': "application/x-www-form-urlencoded"} count = 0 error = True while error: try: error = False resp = requests.post( self.domain + '/oauth/token', headers=header, data=params) try: token = resp.json()['access_token'] except KeyError: raise Exception('Token not provided.') except Exception as err: error = True count += 1 if count > self.retry: break return token def _get_header(self): return {'Authorization': 'Bearer ' + self.token} def check_if_script_runs_too_long(self, script_start_time: int) -> bool: now = int(time.time()) duration = now - script_start_time max_duration = int(MAX_SCRIPT_EXEC_TIME_MINUTES * 60 * 0.80) return duration > max_duration """This method is used to limit the column count Returns: events: Updated Events """ def customize_event(self, el): if "details" in el: if "body" in el["details"]: myjson = str(el["details"]["body"]) if (myjson.startswith("{")): if "app" in el["details"]["body"]: if "metadata" in el["details"]["body"]["app"]: el["details"]["body"]["app"]["metadata"] = json.dumps( el["details"]["body"]["app"]["metadata"]) if "transaction" in el["details"]["body"]: el["details"]["body"]["transaction"] = json.dumps( el["details"]["body"]["transaction"]) if "user" in el["details"]["body"]: if "metadata" in el["details"]["body"]["user"]: el["details"]["body"]["user"]["metadata"] = json.dumps( el["details"]["body"]["user"]["metadata"]) if "request" in el["details"]: if "auth" in el["details"]["request"]: el["details"]["request"]["auth"] = json.dumps( el["details"]["request"]["auth"]) if "body" in el["details"]["request"]: myjson = str(el["details"]["request"]["body"]) if (myjson.startswith("{")): if "app" in el["details"]["request"]["body"]: if "metadata" in el["details"]["request"]["body"]["app"]: el["details"]["request"]["body"]["app"]["metadata"] = json.dumps( el["details"]["request"]["body"]["app"]["metadata"]) if "client" in el["details"]["request"]["body"]: el["details"]["request"]["body"]["client"] = json.dumps( el["details"]["request"]["body"]["client"]) if "refresh" in el["details"]["request"]["body"]: if "token" in el["details"]["request"]["body"]["refresh"]: el["details"]["request"]["body"]["refresh"]["token"] = json.dumps( el["details"]["request"]["body"]["refresh"]["token"]) if "template" in el["details"]["request"]["body"]: el["details"]["request"]["body"]["template"] = json.dumps( el["details"]["request"]["body"]["template"]) details_request_body_template = el["details"]["request"]["body"]["template"] if (len(json.dumps(details_request_body_template).encode()) > FIELD_SIZE_LIMIT_BYTES): queue_list = self._split_big_request( details_request_body_template) count = 1 for q in queue_list: columnname = 'templatePart' + str(count) el['details']['request']['body'][columnname] = q count += 1 if 'templatePart2' in el['details']['request']['body']: del el["details"]["request"]["body"]["template"] if "user" in el["details"]["request"]["body"]: if "metadata" in el["details"]["request"]["body"]["user"]: el["details"]["request"]["body"]["user"]["metadata"] = json.dumps( el["details"]["request"]["body"]["user"]["metadata"]) if "response" in el["details"]: if "body" in el["details"]["response"]: myjson = str(el["details"]["response"]["body"]) if (myjson.startswith("{")): if "app" in el["details"]["response"]["body"]: if "metadata" in el["details"]["response"]["body"]["app"]: el["details"]["response"]["body"]["app"]["metadata"] = json.dumps( el["details"]["response"]["body"]["app"]["metadata"]) if "flags" in el["details"]["response"]["body"]: el["details"]["response"]["body"]["flags"] = json.dumps( el["details"]["response"]["body"]["flags"]) if "refresh" in el["details"]["response"]["body"]: if "token" in el["details"]["response"]["body"]["refresh"]: el["details"]["response"]["body"]["refresh"]["token"] = json.dumps( el["details"]["response"]["body"]["refresh"]["token"]) if "universal" in el["details"]["response"]["body"]: if "login" in el["details"]["response"]["body"]["universal"]: el["details"]["response"]["body"]["universal"]["login"] = json.dumps( el["details"]["response"]["body"]["universal"]["login"]) if "user" in el["details"]["response"]["body"]: if "metadata" in el["details"]["response"]["body"]["user"]: el["details"]["response"]["body"]["user"]["metadata"] = json.dumps( el["details"]["response"]["body"]["user"]["metadata"]) if "bindings" in el['details']['response']['body']: el['details']['response']['body']['bindings'] = json.dumps( el['details']['response']['body']['bindings']) details_response_body_bindings = el['details']['response']['body']['bindings'] if (len(json.dumps(details_response_body_bindings).encode()) > FIELD_SIZE_LIMIT_BYTES): queue_list = self._split_big_request( details_response_body_bindings) count = 1 for q in queue_list: columnname = 'bindingsPart' + str(count) el['details']['response']['body'][columnname] = q count += 1 if 'bindingsPart2' in el['details']['response']['body']: del el['details']['response']['body']['bindings'] return el def _check_size(self, queue): data_bytes_len = len(json.dumps(queue).encode()) return data_bytes_len < FIELD_SIZE_LIMIT_BYTES def _split_big_request(self, queue): if self._check_size(queue): return [queue] else: middle = int(len(queue) / 2) queues_list = [queue[:middle], queue[middle:]] return self._split_big_request(queues_list[0]) + self._split_big_request(queues_list[1]) def clear_event(self, el): if 'details' in el and 'response' in el['details'] and 'body' in el['details']['response'] and 'bindingsPart2' in el['details']['response']['body']: del el['details']['response']['body']['bindings'] if 'details' in el and 'request' in el['details'] and 'body' in el['details']['request'] and 'templatePart2' in el['details']['request']['body']: del el["details"]["request"]["body"]["template"] return el """This method is used to update the statemareker file with lastprocessed event details """ def update_statemarker_file(self, config, last_log_id, events): config['last_log_id'] = last_log_id try: config['last_date'] = events[0]['date'] if last_log_id else config['last_date'] except IndexError: logging.info('Known Indexing Scenario. Proceed with execution') logging.info("new config" + str(config)) self.state_manager.post(json.dumps(config))